Message Formats & Advanced Features
Message Structure
Basic Message Format
{
"_id": "60c7a9b1...", // MongoDB ObjectId
"sender_id": 123, // User ID of sender
"recipient_id": 456, // User ID of recipient
"sender_oid": 789, // Optional: Organization ID of sender
"recipient_oid": 101, // Optional: Organization ID of recipient
"entity_id": 202, // Optional: Related entity ID
"message": "Hello!", // Message content
"attachment": { // Optional: Attachment details
"file_name": "document.pdf",
"file_url": "https://..."
},
"timestamp": 1623456789, // UNIX timestamp
"is_read": false, // Read status
"is_delivered": false, // Delivery status
"priority": "normal", // Message priority
"is_deleted": false, // Deletion status
"hash": "abc123..." // Unique message hash
}
Message Types
-
User-to-User Messages
{
"sender_id": 123,
"recipient_id": 456,
"message": "Hello!"
} -
Organization-to-User Messages
{
"sender_id": 123,
"sender_oid": 789,
"recipient_id": 456,
"message": "Official notification"
} -
Organization-to-Organization Messages
{
"sender_id": 123,
"sender_oid": 789,
"recipient_oid": 101,
"message": "Inter-organization communication"
}
Advanced Features
1. Message Hashing
Messages are assigned unique hashes based on:
- Sender ID
- Recipient ID
- Message content
- Timestamp
- Organization IDs (if applicable)
def generate_message_hash(sender_id, recipient_id, content, timestamp, from_oid=None, to_oid=None):
message_string = f"{sender_id}:{recipient_id}:{content}:{timestamp}"
if from_oid:
message_string += f":from_oid={from_oid}"
if to_oid:
message_string += f":to_oid={to_oid}"
return hashlib.sha256(message_string.encode('utf-8')).hexdigest()
2. Offline Message Delivery
Messages are stored for offline users with:
- Redis-based queue system
- Automatic delivery upon reconnection
- TTL-based expiration (default: 24 hours)
# Redis key format for offline messages
f"offline_messages:{type}:{recipient_id}:{entity_id}"
3. Conversation Management
Get unique conversations with enriched data:
- User details
- Organization details
- Entity information
- Message statistics
{
"total_conversations": 10,
"conversations": [
{
"from_id": {
"uid": 123,
"first_name": "John",
"last_name": "Doe",
"photo": "url...",
"username": "johndoe"
},
"to_id": {
"uid": 456,
"first_name": "Jane",
"last_name": "Smith",
"photo": "url...",
"username": "janesmith"
},
"from_oid": {
"oid": 789,
"name": "Organization A",
"photo": "url...",
"verified": true
},
"entity_id": {
"eid": 101,
"name": "Project X",
"type": "project",
"Asset": {
"url": "url...",
"type": "image/png"
}
},
"total_messages": 50,
"unread_count": 5,
"latest_timestamp": 1623456789
}
]
}
4. Feature Flags
Toggle features through configuration:
{
"enable_read_receipts": true,
"enable_user_presence": true,
"enable_offline_delivery": true,
"enable_typing_indicators": true,
"enable_rate_limiting": true
}
5. Message Queue Architecture
RabbitMQ configuration for reliable message delivery:
# Exchange and queue setup
exchange_name = "messaging_exchange"
queue_name = "chat_messages"
routing_key = "chat_routing"
# Declare exchange
channel.exchange_declare(
exchange=exchange_name,
exchange_type="direct",
durable=True
)
# Declare queue
channel.queue_declare(
queue=queue_name,
durable=True
)
# Bind queue to exchange
channel.queue_bind(
queue=queue_name,
exchange=exchange_name,
routing_key=routing_key
)
6. Real-time Presence Tracking
Redis-based presence system:
# User presence key format
f"user_presence:{user_id}"
# Organization presence key format
f"org_presence:{org_id}"
# Session tracking
f"user_sids:{user_id}"
7. Rate Limiting
Configurable rate limits:
RATELIMIT_DEFAULT = "200 per day;50 per hour"
Implementation using Redis:
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
limiter = Limiter(
key_func=get_remote_address,
storage_uri="redis://localhost:6379/3",
default_limits=["200 per day", "50 per hour"]
)
8. Message Search
MongoDB text search implementation:
# Create text index
db.messages.create_index([("message", "text")])
# Search messages
results = db.messages.find(
{
"$text": {"$search": keyword},
"$or": [
{"sender_id": user_id},
{"recipient_id": user_id}
]
},
{"score": {"$meta": "textScore"}}
).sort([("score", {"$meta": "textScore"})])
Integration Examples
1. Complete Message Flow
// Client-side implementation
const messagingService = {
async sendMessage(recipient, message) {
// Try WebSocket first
if (socket.connected) {
socket.emit("new_message", {
to: recipient.id,
message: message
});
} else {
// Fallback to REST
await fetch('/api/message', {
method: 'POST',
headers: getAuthHeaders(),
body: JSON.stringify({
recipient_id: recipient.id,
message: message
})
});
}
},
setupMessageHandling() {
socket.on("new_message", (data) => {
displayMessage(data);
sendDeliveryReceipt(data);
sendReadReceipt(data);
});
}
};
2. Organization Message Handling
// Organization-specific implementation
const orgMessaging = {
async sendOrgMessage(orgId, message) {
const payload = {
to_oid: orgId,
message: message,
from_oid: currentOrg.id
};
try {
await socket.emit("new_message", payload);
} catch (error) {
console.error("Failed to send organization message:", error);
// Implement fallback strategy
}
}
};